// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include <ctime>
#include <memory>
#include <string>
#include <thread>

#include "db/db_test_util.h"
#include "db/memtable.h"
#include "db/range_del_aggregator.h"
#include "port/stack_trace.h"
#include "pure_mem/key_index/inline_uk_index.h"
#include "pure_mem/mvcc_key.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice_transform.h"

namespace rocksdb {

class InlineukIndexTest : public DBTestBase {
public:
  InlineukIndexTest() : DBTestBase("/InlineukIndexTest") {}

  static uint64_t getKey(const char *key) {
    Slice curKey = GetLengthPrefixedSlice(key);
    MvccKey iter_mk;
    iter_mk.parseKey(curKey);
    return atoi(iter_mk.userKey_.data());
  }

  static const char *genMVCCKey(std::string key) {
    uint32_t keySize = key.size() + 1 + 8;

    const uint32_t encoded_len =
        VarintLength(keySize) + keySize + 1 + 8 + VarintLength(0);
    char *ret = new char[encoded_len];
    memset(ret, 0, encoded_len);

    char *p = EncodeVarint32(ret, keySize);
    memcpy(p, key.data(), key.size());
    p += key.size() + 1;
    uint64_t packed = PackSequenceAndType(10000, kTypeValue);
    EncodeFixed64(p, packed);
    p += 8;
    p = EncodeVarint32(p, 0);

    return ret;
  }

  static void
  insertAndGet(int tail,
               InlineUserKeyIndex<const MemTableRep::KeyComparator &> *list) {
    bool res;
    std::cout << "Enter the loop" << std::endl;
    // Test the duplicate keys under stress
    SequenceNumber seq = 10000000;
    int record_sum = 500000;
    for (int i = 0; i < record_sum; i++, seq++) {
      std::string a = std::to_string(seq) + std::to_string(tail);
      const char *buf = genMVCCKey(a);
      EncodedVersionNode *cur = new EncodedVersionNode();
      cur->CASSetKey(nullptr, (void *)buf);
      res = list->Insert((const char *)cur);
      ASSERT_TRUE(res);
    }
    std::cout << "-----------insert all this thread records ok." << std::endl;
  }
};

TEST_F(InlineukIndexTest, memRelease) {
  InternalKeyComparator cmp(BytewiseComparator());
  MemTable::KeyComparator comparator_(cmp);

  SequenceNumber seq = 10000000;
  int record_sum = 1000000;
  for (size_t j = 0; j < 20; j++) {
    const MemTableRep::KeyComparator &cmp_ = comparator_;
    InlineUserKeyIndex<const MemTableRep::KeyComparator &> *list =
        new InlineUserKeyIndex<const MemTableRep::KeyComparator &>(
            cmp_, ART_ROWEX_FULLKEY);
    for (int i = 0; i < record_sum; i++, seq++) {
      std::string a = std::to_string(seq);

      const uint32_t encoded_len = 1 * 1024;
      char *buf = new char[encoded_len];
      memset(buf, 0, encoded_len);

      char *p = EncodeVarint32(buf, a.size() + 1 + 8);
      memcpy(p, a.data(), a.size());
      p += a.size() + 1;
      uint64_t packed = PackSequenceAndType(10000, kTypeValue);
      EncodeFixed64(p, packed);
      p += 8;
      p = EncodeVarint32(p, 0);

      EncodedVersionNode *cur = new EncodedVersionNode();
      cur->CASSetKey(nullptr, (void *)buf);
      bool res = list->Insert((const char *)cur);
      ASSERT_TRUE(res);
    }

    InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator *iter =
        list->GetIterator();
    iter->SeekToFirst();
    int nodeNUM = 0;
    while (iter->Valid()) {
      nodeNUM++;
      delete[] iter->key();
      iter->Next();
    }
    delete list;
    std::cout << "finish times: " << j << std::endl;
  }
}

TEST_F(InlineukIndexTest, ConcInsert) {

  InternalKeyComparator cmp(BytewiseComparator());
  MemTable::KeyComparator comparator_(cmp);
  const MemTableRep::KeyComparator &cmp_ = comparator_;
  InlineUserKeyIndex<const MemTableRep::KeyComparator &> *list =
      new InlineUserKeyIndex<const MemTableRep::KeyComparator &>(
          cmp_, ART_ROWEX_FULLKEY);

  std::thread test1(InlineukIndexTest::insertAndGet, 0, list);
  std::thread test2(InlineukIndexTest::insertAndGet, 1, list);
  std::thread test3(InlineukIndexTest::insertAndGet, 2, list);
  std::thread test4(InlineukIndexTest::insertAndGet, 3, list);
  std::thread test5(InlineukIndexTest::insertAndGet, 4, list);
  std::thread test6(InlineukIndexTest::insertAndGet, 5, list);
  std::thread test7(InlineukIndexTest::insertAndGet, 6, list);
  std::thread test8(InlineukIndexTest::insertAndGet, 7, list);
  std::thread test9(InlineukIndexTest::insertAndGet, 8, list);
  std::thread test10(InlineukIndexTest::insertAndGet, 9, list);

  test1.join();
  test2.join();
  test3.join();
  test4.join();
  test5.join();
  test6.join();
  test7.join();
  test8.join();
  test9.join();
  test10.join();
  std::cout << "+++++++++ all records insert ok.!!!!" << std::endl;

  InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator *iter =
      list->GetIterator();
  iter->SeekToFirst();
  size_t totalKVNum = 0;
  while (iter->Valid()) {
    uint64_t keyss = InlineukIndexTest::getKey(iter->key());
    assert(100000000 + totalKVNum == keyss);
    totalKVNum++;
    iter->Next();
  }
  std::cout << "-----------insert all records[" << totalKVNum << "] ok."
            << std::endl;
  delete list;
}

} // namespace rocksdb

int main(int argc, char **argv) {
  rocksdb::port::InstallStackTraceHandler();
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}
